10 Star 38 Fork 19

Gitee 极速下载 / Pion-WebRTC

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/pion/webrtc
克隆/下载
peerconnection_media_test.go 47.46 KB
一键复制 编辑 原始数据 按行查看 历史
knowmost 提交于 2024-04-28 11:13 . Fix typos in multiple comments
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
//go:build !js
// +build !js
package webrtc
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"regexp"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/pion/logging"
"github.com/pion/randutil"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/sdp/v3"
"github.com/pion/transport/v3/test"
"github.com/pion/transport/v3/vnet"
"github.com/pion/webrtc/v4/internal/util"
"github.com/pion/webrtc/v4/pkg/media"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var (
errIncomingTrackIDInvalid = errors.New("incoming Track ID is invalid")
errIncomingTrackLabelInvalid = errors.New("incoming Track Label is invalid")
errNoTransceiverwithMid = errors.New("no transceiver with mid")
)
/*
Integration test for bi-directional peers
This asserts we can send RTP and RTCP both ways, and blocks until
each side gets something (and asserts payload contents)
*/
// nolint: gocyclo
func TestPeerConnection_Media_Sample(t *testing.T) {
const (
expectedTrackID = "video"
expectedStreamID = "pion"
)
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pcOffer, pcAnswer, err := newPair()
if err != nil {
t.Fatal(err)
}
awaitRTPRecv := make(chan bool)
awaitRTPRecvClosed := make(chan bool)
awaitRTPSend := make(chan bool)
awaitRTCPSenderRecv := make(chan bool)
awaitRTCPSenderSend := make(chan error)
awaitRTCPReceiverRecv := make(chan error)
awaitRTCPReceiverSend := make(chan error)
trackMetadataValid := make(chan error)
pcAnswer.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
if track.ID() != expectedTrackID {
trackMetadataValid <- fmt.Errorf("%w: expected(%s) actual(%s)", errIncomingTrackIDInvalid, expectedTrackID, track.ID())
return
}
if track.StreamID() != expectedStreamID {
trackMetadataValid <- fmt.Errorf("%w: expected(%s) actual(%s)", errIncomingTrackLabelInvalid, expectedStreamID, track.StreamID())
return
}
close(trackMetadataValid)
go func() {
for {
time.Sleep(time.Millisecond * 100)
if routineErr := pcAnswer.WriteRTCP([]rtcp.Packet{&rtcp.RapidResynchronizationRequest{SenderSSRC: uint32(track.SSRC()), MediaSSRC: uint32(track.SSRC())}}); routineErr != nil {
awaitRTCPReceiverSend <- routineErr
return
}
select {
case <-awaitRTCPSenderRecv:
close(awaitRTCPReceiverSend)
return
default:
}
}
}()
go func() {
_, _, routineErr := receiver.Read(make([]byte, 1400))
if routineErr != nil {
awaitRTCPReceiverRecv <- routineErr
} else {
close(awaitRTCPReceiverRecv)
}
}()
haveClosedAwaitRTPRecv := false
for {
p, _, routineErr := track.ReadRTP()
if routineErr != nil {
close(awaitRTPRecvClosed)
return
} else if bytes.Equal(p.Payload, []byte{0x10, 0x00}) && !haveClosedAwaitRTPRecv {
haveClosedAwaitRTPRecv = true
close(awaitRTPRecv)
}
}
})
vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, expectedTrackID, expectedStreamID)
if err != nil {
t.Fatal(err)
}
sender, err := pcOffer.AddTrack(vp8Track)
if err != nil {
t.Fatal(err)
}
go func() {
for {
time.Sleep(time.Millisecond * 100)
if pcOffer.ICEConnectionState() != ICEConnectionStateConnected {
continue
}
if routineErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); routineErr != nil {
fmt.Println(routineErr)
}
select {
case <-awaitRTPRecv:
close(awaitRTPSend)
return
default:
}
}
}()
go func() {
parameters := sender.GetParameters()
for {
time.Sleep(time.Millisecond * 100)
if routineErr := pcOffer.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{SenderSSRC: uint32(parameters.Encodings[0].SSRC), MediaSSRC: uint32(parameters.Encodings[0].SSRC)}}); routineErr != nil {
awaitRTCPSenderSend <- routineErr
}
select {
case <-awaitRTCPReceiverRecv:
close(awaitRTCPSenderSend)
return
default:
}
}
}()
go func() {
if _, _, routineErr := sender.Read(make([]byte, 1400)); routineErr == nil {
close(awaitRTCPSenderRecv)
}
}()
assert.NoError(t, signalPair(pcOffer, pcAnswer))
err, ok := <-trackMetadataValid
if ok {
t.Fatal(err)
}
<-awaitRTPRecv
<-awaitRTPSend
<-awaitRTCPSenderRecv
if err, ok = <-awaitRTCPSenderSend; ok {
t.Fatal(err)
}
<-awaitRTCPReceiverRecv
if err, ok = <-awaitRTCPReceiverSend; ok {
t.Fatal(err)
}
closePairNow(t, pcOffer, pcAnswer)
<-awaitRTPRecvClosed
}
/*
PeerConnection should be able to be torn down at anytime
This test adds an input track and asserts
* OnTrack doesn't fire since no video packets will arrive
* No goroutine leaks
* No deadlocks on shutdown
*/
func TestPeerConnection_Media_Shutdown(t *testing.T) {
iceCompleteAnswer := make(chan struct{})
iceCompleteOffer := make(chan struct{})
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pcOffer, pcAnswer, err := newPair()
if err != nil {
t.Fatal(err)
}
_, err = pcOffer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
if err != nil {
t.Fatal(err)
}
_, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeAudio, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
if err != nil {
t.Fatal(err)
}
opusTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeOpus}, "audio", "pion1")
if err != nil {
t.Fatal(err)
}
vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
if err != nil {
t.Fatal(err)
}
if _, err = pcOffer.AddTrack(opusTrack); err != nil {
t.Fatal(err)
} else if _, err = pcAnswer.AddTrack(vp8Track); err != nil {
t.Fatal(err)
}
var onTrackFiredLock sync.Mutex
onTrackFired := false
pcAnswer.OnTrack(func(*TrackRemote, *RTPReceiver) {
onTrackFiredLock.Lock()
defer onTrackFiredLock.Unlock()
onTrackFired = true
})
pcAnswer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
if iceState == ICEConnectionStateConnected {
close(iceCompleteAnswer)
}
})
pcOffer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
if iceState == ICEConnectionStateConnected {
close(iceCompleteOffer)
}
})
err = signalPair(pcOffer, pcAnswer)
if err != nil {
t.Fatal(err)
}
<-iceCompleteAnswer
<-iceCompleteOffer
// Each PeerConnection should have one sender, one receiver and one transceiver
for _, pc := range []*PeerConnection{pcOffer, pcAnswer} {
senders := pc.GetSenders()
if len(senders) != 1 {
t.Errorf("Each PeerConnection should have one RTPSender, we have %d", len(senders))
}
receivers := pc.GetReceivers()
if len(receivers) != 2 {
t.Errorf("Each PeerConnection should have two RTPReceivers, we have %d", len(receivers))
}
transceivers := pc.GetTransceivers()
if len(transceivers) != 2 {
t.Errorf("Each PeerConnection should have two RTPTransceivers, we have %d", len(transceivers))
}
}
closePairNow(t, pcOffer, pcAnswer)
onTrackFiredLock.Lock()
if onTrackFired {
t.Fatalf("PeerConnection OnTrack fired even though we got no packets")
}
onTrackFiredLock.Unlock()
}
/*
Integration test for behavior around media and disconnected peers
* Sending RTP and RTCP to a disconnected Peer shouldn't return an error
*/
func TestPeerConnection_Media_Disconnected(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
s := SettingEngine{}
s.SetICETimeouts(time.Second/2, time.Second/2, time.Second/8)
m := &MediaEngine{}
assert.NoError(t, m.RegisterDefaultCodecs())
pcOffer, pcAnswer, wan := createVNetPair(t)
keepPackets := &atomicBool{}
keepPackets.set(true)
// Add a filter that monitors the traffic on the router
wan.AddChunkFilter(func(vnet.Chunk) bool {
return keepPackets.get()
})
vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
if err != nil {
t.Fatal(err)
}
vp8Sender, err := pcOffer.AddTrack(vp8Track)
if err != nil {
t.Fatal(err)
}
haveDisconnected := make(chan error)
pcOffer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
if iceState == ICEConnectionStateDisconnected {
close(haveDisconnected)
} else if iceState == ICEConnectionStateConnected {
// Assert that DTLS is done by pull remote certificate, don't tear down the PC early
for {
if len(vp8Sender.Transport().GetRemoteCertificate()) != 0 {
if pcAnswer.sctpTransport.association() != nil {
break
}
}
time.Sleep(time.Second)
}
keepPackets.set(false)
}
})
if err = signalPair(pcOffer, pcAnswer); err != nil {
t.Fatal(err)
}
err, ok := <-haveDisconnected
if ok {
t.Fatal(err)
}
for i := 0; i <= 5; i++ {
if rtpErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); rtpErr != nil {
t.Fatal(rtpErr)
} else if rtcpErr := pcOffer.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: 0}}); rtcpErr != nil {
t.Fatal(rtcpErr)
}
}
assert.NoError(t, wan.Stop())
closePairNow(t, pcOffer, pcAnswer)
}
type undeclaredSsrcLogger struct{ unhandledSimulcastError chan struct{} }
func (u *undeclaredSsrcLogger) Trace(string) {}
func (u *undeclaredSsrcLogger) Tracef(string, ...interface{}) {}
func (u *undeclaredSsrcLogger) Debug(string) {}
func (u *undeclaredSsrcLogger) Debugf(string, ...interface{}) {}
func (u *undeclaredSsrcLogger) Info(string) {}
func (u *undeclaredSsrcLogger) Infof(string, ...interface{}) {}
func (u *undeclaredSsrcLogger) Warn(string) {}
func (u *undeclaredSsrcLogger) Warnf(string, ...interface{}) {}
func (u *undeclaredSsrcLogger) Error(string) {}
func (u *undeclaredSsrcLogger) Errorf(format string, _ ...interface{}) {
if format == incomingUnhandledRTPSsrc {
close(u.unhandledSimulcastError)
}
}
type undeclaredSsrcLoggerFactory struct{ unhandledSimulcastError chan struct{} }
func (u *undeclaredSsrcLoggerFactory) NewLogger(string) logging.LeveledLogger {
return &undeclaredSsrcLogger{u.unhandledSimulcastError}
}
// Filter SSRC lines
func filterSsrc(offer string) (filteredSDP string) {
scanner := bufio.NewScanner(strings.NewReader(offer))
for scanner.Scan() {
l := scanner.Text()
if strings.HasPrefix(l, "a=ssrc") {
continue
}
filteredSDP += l + "\n"
}
return
}
// If a SessionDescription has a single media section and no SSRC
// assume that it is meant to handle all RTP packets
func TestUndeclaredSSRC(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
t.Run("No SSRC", func(t *testing.T) {
pcOffer, pcAnswer, err := newPair()
assert.NoError(t, err)
vp8Writer, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
assert.NoError(t, err)
_, err = pcOffer.AddTrack(vp8Writer)
assert.NoError(t, err)
onTrackFired := make(chan struct{})
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
assert.Equal(t, trackRemote.StreamID(), vp8Writer.StreamID())
assert.Equal(t, trackRemote.ID(), vp8Writer.ID())
close(onTrackFired)
})
offer, err := pcOffer.CreateOffer(nil)
assert.NoError(t, err)
offerGatheringComplete := GatheringCompletePromise(pcOffer)
assert.NoError(t, pcOffer.SetLocalDescription(offer))
<-offerGatheringComplete
offer.SDP = filterSsrc(pcOffer.LocalDescription().SDP)
assert.NoError(t, pcAnswer.SetRemoteDescription(offer))
answer, err := pcAnswer.CreateAnswer(nil)
assert.NoError(t, err)
answerGatheringComplete := GatheringCompletePromise(pcAnswer)
assert.NoError(t, pcAnswer.SetLocalDescription(answer))
<-answerGatheringComplete
assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
sendVideoUntilDone(onTrackFired, t, []*TrackLocalStaticSample{vp8Writer})
closePairNow(t, pcOffer, pcAnswer)
})
t.Run("Has RID", func(t *testing.T) {
unhandledSimulcastError := make(chan struct{})
m := &MediaEngine{}
assert.NoError(t, m.RegisterDefaultCodecs())
pcOffer, pcAnswer, err := NewAPI(WithSettingEngine(SettingEngine{
LoggerFactory: &undeclaredSsrcLoggerFactory{unhandledSimulcastError},
}), WithMediaEngine(m)).newPair(Configuration{})
assert.NoError(t, err)
vp8Writer, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
assert.NoError(t, err)
_, err = pcOffer.AddTrack(vp8Writer)
assert.NoError(t, err)
offer, err := pcOffer.CreateOffer(nil)
assert.NoError(t, err)
offerGatheringComplete := GatheringCompletePromise(pcOffer)
assert.NoError(t, pcOffer.SetLocalDescription(offer))
<-offerGatheringComplete
// Append RID to end of SessionDescription. Will not be considered unhandled anymore
offer.SDP = filterSsrc(pcOffer.LocalDescription().SDP) + "a=" + sdpAttributeRid + "\r\n"
assert.NoError(t, pcAnswer.SetRemoteDescription(offer))
answer, err := pcAnswer.CreateAnswer(nil)
assert.NoError(t, err)
answerGatheringComplete := GatheringCompletePromise(pcAnswer)
assert.NoError(t, pcAnswer.SetLocalDescription(answer))
<-answerGatheringComplete
assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
sendVideoUntilDone(unhandledSimulcastError, t, []*TrackLocalStaticSample{vp8Writer})
closePairNow(t, pcOffer, pcAnswer)
})
}
func TestAddTransceiverFromTrackSendOnly(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pc, err := NewPeerConnection(Configuration{})
if err != nil {
t.Error(err.Error())
}
track, err := NewTrackLocalStaticSample(
RTPCodecCapability{MimeType: "audio/Opus"},
"track-id",
"stream-id",
)
if err != nil {
t.Error(err.Error())
}
transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
Direction: RTPTransceiverDirectionSendonly,
})
if err != nil {
t.Error(err.Error())
}
if transceiver.Receiver() != nil {
t.Errorf("Transceiver shouldn't have a receiver")
}
if transceiver.Sender() == nil {
t.Errorf("Transceiver should have a sender")
}
if len(pc.GetTransceivers()) != 1 {
t.Errorf("PeerConnection should have one transceiver but has %d", len(pc.GetTransceivers()))
}
if len(pc.GetSenders()) != 1 {
t.Errorf("PeerConnection should have one sender but has %d", len(pc.GetSenders()))
}
offer, err := pc.CreateOffer(nil)
if err != nil {
t.Error(err.Error())
}
if !offerMediaHasDirection(offer, RTPCodecTypeAudio, RTPTransceiverDirectionSendonly) {
t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionSendonly)
}
assert.NoError(t, pc.Close())
}
func TestAddTransceiverFromTrackSendRecv(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pc, err := NewPeerConnection(Configuration{})
if err != nil {
t.Error(err.Error())
}
track, err := NewTrackLocalStaticSample(
RTPCodecCapability{MimeType: "audio/Opus"},
"track-id",
"stream-id",
)
if err != nil {
t.Error(err.Error())
}
transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
Direction: RTPTransceiverDirectionSendrecv,
})
if err != nil {
t.Error(err.Error())
}
if transceiver.Receiver() == nil {
t.Errorf("Transceiver should have a receiver")
}
if transceiver.Sender() == nil {
t.Errorf("Transceiver should have a sender")
}
if len(pc.GetTransceivers()) != 1 {
t.Errorf("PeerConnection should have one transceiver but has %d", len(pc.GetTransceivers()))
}
offer, err := pc.CreateOffer(nil)
if err != nil {
t.Error(err.Error())
}
if !offerMediaHasDirection(offer, RTPCodecTypeAudio, RTPTransceiverDirectionSendrecv) {
t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionSendrecv)
}
assert.NoError(t, pc.Close())
}
func TestAddTransceiverAddTrack_Reuse(t *testing.T) {
pc, err := NewPeerConnection(Configuration{})
assert.NoError(t, err)
tr, err := pc.AddTransceiverFromKind(
RTPCodecTypeVideo,
RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly},
)
assert.NoError(t, err)
assert.Equal(t, []*RTPTransceiver{tr}, pc.GetTransceivers())
addTrack := func() (TrackLocal, *RTPSender) {
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
assert.NoError(t, err)
sender, err := pc.AddTrack(track)
assert.NoError(t, err)
return track, sender
}
track1, sender1 := addTrack()
assert.Equal(t, 1, len(pc.GetTransceivers()))
assert.Equal(t, sender1, tr.Sender())
assert.Equal(t, track1, tr.Sender().Track())
require.NoError(t, pc.RemoveTrack(sender1))
track2, _ := addTrack()
assert.Equal(t, 1, len(pc.GetTransceivers()))
assert.Equal(t, track2, tr.Sender().Track())
addTrack()
assert.Equal(t, 2, len(pc.GetTransceivers()))
assert.NoError(t, pc.Close())
}
func TestAddTransceiverAddTrack_NewRTPSender_Error(t *testing.T) {
pc, err := NewPeerConnection(Configuration{})
assert.NoError(t, err)
_, err = pc.AddTransceiverFromKind(
RTPCodecTypeVideo,
RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly},
)
assert.NoError(t, err)
dtlsTransport := pc.dtlsTransport
pc.dtlsTransport = nil
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
assert.NoError(t, err)
_, err = pc.AddTrack(track)
assert.Error(t, err, "DTLSTransport must not be nil")
assert.Equal(t, 1, len(pc.GetTransceivers()))
pc.dtlsTransport = dtlsTransport
assert.NoError(t, pc.Close())
}
func TestRtpSenderReceiver_ReadClose_Error(t *testing.T) {
pc, err := NewPeerConnection(Configuration{})
assert.NoError(t, err)
tr, err := pc.AddTransceiverFromKind(
RTPCodecTypeVideo,
RTPTransceiverInit{Direction: RTPTransceiverDirectionSendrecv},
)
assert.NoError(t, err)
sender, receiver := tr.Sender(), tr.Receiver()
assert.NoError(t, sender.Stop())
_, _, err = sender.Read(make([]byte, 0, 1400))
assert.ErrorIs(t, err, io.ErrClosedPipe)
assert.NoError(t, receiver.Stop())
_, _, err = receiver.Read(make([]byte, 0, 1400))
assert.ErrorIs(t, err, io.ErrClosedPipe)
assert.NoError(t, pc.Close())
}
// nolint: dupl
func TestAddTransceiverFromKind(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pc, err := NewPeerConnection(Configuration{})
if err != nil {
t.Error(err.Error())
}
transceiver, err := pc.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{
Direction: RTPTransceiverDirectionRecvonly,
})
if err != nil {
t.Error(err.Error())
}
if transceiver.Receiver() == nil {
t.Errorf("Transceiver should have a receiver")
}
if transceiver.Sender() != nil {
t.Errorf("Transceiver shouldn't have a sender")
}
offer, err := pc.CreateOffer(nil)
if err != nil {
t.Error(err.Error())
}
if !offerMediaHasDirection(offer, RTPCodecTypeVideo, RTPTransceiverDirectionRecvonly) {
t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionRecvonly)
}
assert.NoError(t, pc.Close())
}
func TestAddTransceiverFromTrackFailsRecvOnly(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pc, err := NewPeerConnection(Configuration{})
if err != nil {
t.Error(err.Error())
}
track, err := NewTrackLocalStaticSample(
RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f"},
"track-id",
"track-label",
)
if err != nil {
t.Error(err.Error())
}
transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
Direction: RTPTransceiverDirectionRecvonly,
})
if transceiver != nil {
t.Error("AddTransceiverFromTrack shouldn't succeed with Direction RTPTransceiverDirectionRecvonly")
}
assert.NotNil(t, err)
assert.NoError(t, pc.Close())
}
func TestPlanBMediaExchange(t *testing.T) {
runTest := func(trackCount int, t *testing.T) {
addSingleTrack := func(p *PeerConnection) *TrackLocalStaticSample {
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, fmt.Sprintf("video-%d", randutil.NewMathRandomGenerator().Uint32()), fmt.Sprintf("video-%d", randutil.NewMathRandomGenerator().Uint32()))
assert.NoError(t, err)
_, err = p.AddTrack(track)
assert.NoError(t, err)
return track
}
pcOffer, err := NewPeerConnection(Configuration{SDPSemantics: SDPSemanticsPlanB})
assert.NoError(t, err)
pcAnswer, err := NewPeerConnection(Configuration{SDPSemantics: SDPSemanticsPlanB})
assert.NoError(t, err)
var onTrackWaitGroup sync.WaitGroup
onTrackWaitGroup.Add(trackCount)
pcAnswer.OnTrack(func(*TrackRemote, *RTPReceiver) {
onTrackWaitGroup.Done()
})
done := make(chan struct{})
go func() {
onTrackWaitGroup.Wait()
close(done)
}()
_, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo)
assert.NoError(t, err)
outboundTracks := []*TrackLocalStaticSample{}
for i := 0; i < trackCount; i++ {
outboundTracks = append(outboundTracks, addSingleTrack(pcOffer))
}
assert.NoError(t, signalPair(pcOffer, pcAnswer))
func() {
for {
select {
case <-time.After(20 * time.Millisecond):
for _, track := range outboundTracks {
assert.NoError(t, track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}))
}
case <-done:
return
}
}
}()
closePairNow(t, pcOffer, pcAnswer)
}
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
t.Run("Single Track", func(t *testing.T) {
runTest(1, t)
})
t.Run("Multi Track", func(t *testing.T) {
runTest(2, t)
})
}
// TestPeerConnection_Start_Only_Negotiated_Senders tests that only
// the current negotiated transceivers senders provided in an
// offer/answer are started
func TestPeerConnection_Start_Only_Negotiated_Senders(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pcOffer, err := NewPeerConnection(Configuration{})
assert.NoError(t, err)
defer func() { assert.NoError(t, pcOffer.Close()) }()
pcAnswer, err := NewPeerConnection(Configuration{})
assert.NoError(t, err)
defer func() { assert.NoError(t, pcAnswer.Close()) }()
track1, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1")
require.NoError(t, err)
sender1, err := pcOffer.AddTrack(track1)
require.NoError(t, err)
offer, err := pcOffer.CreateOffer(nil)
assert.NoError(t, err)
offerGatheringComplete := GatheringCompletePromise(pcOffer)
assert.NoError(t, pcOffer.SetLocalDescription(offer))
<-offerGatheringComplete
assert.NoError(t, pcAnswer.SetRemoteDescription(*pcOffer.LocalDescription()))
answer, err := pcAnswer.CreateAnswer(nil)
assert.NoError(t, err)
answerGatheringComplete := GatheringCompletePromise(pcAnswer)
assert.NoError(t, pcAnswer.SetLocalDescription(answer))
<-answerGatheringComplete
// Add a new track between providing the offer and applying the answer
track2, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
require.NoError(t, err)
sender2, err := pcOffer.AddTrack(track2)
require.NoError(t, err)
// apply answer so we'll test generateMatchedSDP
assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
// Wait for senders to be started by startTransports spawned goroutine
pcOffer.ops.Done()
// sender1 should be started but sender2 should not be started
assert.True(t, sender1.hasSent(), "sender1 is not started but should be started")
assert.False(t, sender2.hasSent(), "sender2 is started but should not be started")
}
// TestPeerConnection_Start_Right_Receiver tests that the right
// receiver (the receiver which transceiver has the same media section as the track)
// is started for the specified track
func TestPeerConnection_Start_Right_Receiver(t *testing.T) {
isTransceiverReceiverStarted := func(pc *PeerConnection, mid string) (bool, error) {
for _, transceiver := range pc.GetTransceivers() {
if transceiver.Mid() != mid {
continue
}
return transceiver.Receiver() != nil && transceiver.Receiver().haveReceived(), nil
}
return false, fmt.Errorf("%w: %q", errNoTransceiverwithMid, mid)
}
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pcOffer, pcAnswer, err := newPair()
require.NoError(t, err)
_, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
assert.NoError(t, err)
track1, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1")
require.NoError(t, err)
sender1, err := pcOffer.AddTrack(track1)
require.NoError(t, err)
assert.NoError(t, signalPair(pcOffer, pcAnswer))
pcOffer.ops.Done()
pcAnswer.ops.Done()
// transceiver with mid 0 should be started
started, err := isTransceiverReceiverStarted(pcAnswer, "0")
assert.NoError(t, err)
assert.True(t, started, "transceiver with mid 0 should be started")
// Remove track
assert.NoError(t, pcOffer.RemoveTrack(sender1))
assert.NoError(t, signalPair(pcOffer, pcAnswer))
pcOffer.ops.Done()
pcAnswer.ops.Done()
// transceiver with mid 0 should not be started
started, err = isTransceiverReceiverStarted(pcAnswer, "0")
assert.NoError(t, err)
assert.False(t, started, "transceiver with mid 0 should not be started")
// Add a new transceiver (we're not using AddTrack since it'll reuse the transceiver with mid 0)
_, err = pcOffer.AddTransceiverFromTrack(track1)
assert.NoError(t, err)
_, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
assert.NoError(t, err)
assert.NoError(t, signalPair(pcOffer, pcAnswer))
pcOffer.ops.Done()
pcAnswer.ops.Done()
// transceiver with mid 0 should not be started
started, err = isTransceiverReceiverStarted(pcAnswer, "0")
assert.NoError(t, err)
assert.False(t, started, "transceiver with mid 0 should not be started")
// transceiver with mid 2 should be started
started, err = isTransceiverReceiverStarted(pcAnswer, "2")
assert.NoError(t, err)
assert.True(t, started, "transceiver with mid 2 should be started")
closePairNow(t, pcOffer, pcAnswer)
}
func TestPeerConnection_Simulcast_Probe(t *testing.T) {
lim := test.TimeOut(time.Second * 30) //nolint
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
// Assert that failed Simulcast probing doesn't cause
// the handleUndeclaredSSRC to be leaked
t.Run("Leak", func(t *testing.T) {
track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)
offerer, answerer, err := newPair()
assert.NoError(t, err)
_, err = offerer.AddTrack(track)
assert.NoError(t, err)
ticker := time.NewTicker(time.Millisecond * 20)
testFinished := make(chan struct{})
seenFiveStreams, seenFiveStreamsCancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-testFinished:
return
case <-ticker.C:
answerer.dtlsTransport.lock.Lock()
if len(answerer.dtlsTransport.simulcastStreams) >= 5 {
seenFiveStreamsCancel()
}
answerer.dtlsTransport.lock.Unlock()
track.mu.Lock()
if len(track.bindings) == 1 {
_, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
Version: 2,
SSRC: randutil.NewMathRandomGenerator().Uint32(),
}, []byte{0, 1, 2, 3, 4, 5})
assert.NoError(t, err)
}
track.mu.Unlock()
}
}
}()
assert.NoError(t, signalPair(offerer, answerer))
peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, offerer, answerer)
peerConnectionConnected.Wait()
<-seenFiveStreams.Done()
closePairNow(t, offerer, answerer)
close(testFinished)
})
// Assert that NonSimulcast Traffic isn't incorrectly broken by the probe
t.Run("Break NonSimulcast", func(t *testing.T) {
unhandledSimulcastError := make(chan struct{})
m := &MediaEngine{}
assert.NoError(t, m.RegisterDefaultCodecs())
assert.NoError(t, ConfigureSimulcastExtensionHeaders(m))
pcOffer, pcAnswer, err := NewAPI(WithSettingEngine(SettingEngine{
LoggerFactory: &undeclaredSsrcLoggerFactory{unhandledSimulcastError},
}), WithMediaEngine(m)).newPair(Configuration{})
assert.NoError(t, err)
firstTrack, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "firstTrack", "firstTrack")
assert.NoError(t, err)
_, err = pcOffer.AddTrack(firstTrack)
assert.NoError(t, err)
secondTrack, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "secondTrack", "secondTrack")
assert.NoError(t, err)
_, err = pcOffer.AddTrack(secondTrack)
assert.NoError(t, err)
assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(sessionDescription string) (filtered string) {
shouldDiscard := false
scanner := bufio.NewScanner(strings.NewReader(sessionDescription))
for scanner.Scan() {
if strings.HasPrefix(scanner.Text(), "m=video") {
shouldDiscard = !shouldDiscard
}
if !shouldDiscard {
filtered += scanner.Text() + "\r\n"
}
}
return
}))
sequenceNumber := uint16(0)
sendRTPPacket := func() {
sequenceNumber++
assert.NoError(t, firstTrack.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
},
Payload: []byte{0x00},
}))
time.Sleep(20 * time.Millisecond)
}
for ; sequenceNumber <= 5; sequenceNumber++ {
sendRTPPacket()
}
assert.NoError(t, signalPair(pcOffer, pcAnswer))
trackRemoteChan := make(chan *TrackRemote, 1)
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
trackRemoteChan <- trackRemote
})
trackRemote := func() *TrackRemote {
for {
select {
case t := <-trackRemoteChan:
return t
default:
sendRTPPacket()
}
}
}()
func() {
for {
select {
case <-unhandledSimulcastError:
return
default:
sendRTPPacket()
}
}
}()
_, _, err = trackRemote.Read(make([]byte, 1500))
assert.NoError(t, err)
closePairNow(t, pcOffer, pcAnswer)
})
}
// Assert that CreateOffer returns an error for a RTPSender with no codecs
// pion/webrtc#1702
func TestPeerConnection_CreateOffer_NoCodecs(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
m := &MediaEngine{}
pc, err := NewAPI(WithMediaEngine(m)).NewPeerConnection(Configuration{})
assert.NoError(t, err)
track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)
_, err = pc.AddTrack(track)
assert.NoError(t, err)
_, err = pc.CreateOffer(nil)
assert.Equal(t, err, ErrSenderWithNoCodecs)
assert.NoError(t, pc.Close())
}
// Assert that AddTrack is thread-safe
func TestPeerConnection_RaceReplaceTrack(t *testing.T) {
pc, err := NewPeerConnection(Configuration{})
assert.NoError(t, err)
addTrack := func() *TrackLocalStaticSample {
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
assert.NoError(t, err)
_, err = pc.AddTrack(track)
assert.NoError(t, err)
return track
}
for i := 0; i < 10; i++ {
addTrack()
}
for _, tr := range pc.GetTransceivers() {
assert.NoError(t, pc.RemoveTrack(tr.Sender()))
}
var wg sync.WaitGroup
tracks := make([]*TrackLocalStaticSample, 10)
wg.Add(10)
for i := 0; i < 10; i++ {
go func(j int) {
tracks[j] = addTrack()
wg.Done()
}(i)
}
wg.Wait()
for _, track := range tracks {
have := false
for _, t := range pc.GetTransceivers() {
if t.Sender() != nil && t.Sender().Track() == track {
have = true
break
}
}
if !have {
t.Errorf("track was added but not found on senders")
}
}
assert.NoError(t, pc.Close())
}
func TestPeerConnection_Simulcast(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
rids := []string{"a", "b", "c"}
t.Run("E2E", func(t *testing.T) {
pcOffer, pcAnswer, err := newPair()
assert.NoError(t, err)
vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]))
assert.NoError(t, err)
vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]))
assert.NoError(t, err)
vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[2]))
assert.NoError(t, err)
sender, err := pcOffer.AddTrack(vp8WriterA)
assert.NoError(t, err)
assert.NotNil(t, sender)
assert.NoError(t, sender.AddEncoding(vp8WriterB))
assert.NoError(t, sender.AddEncoding(vp8WriterC))
var ridMapLock sync.RWMutex
ridMap := map[string]int{}
assertRidCorrect := func(t *testing.T) {
ridMapLock.Lock()
defer ridMapLock.Unlock()
for _, rid := range rids {
assert.Equal(t, ridMap[rid], 1)
}
assert.Equal(t, len(ridMap), 3)
}
ridsFullfilled := func() bool {
ridMapLock.Lock()
defer ridMapLock.Unlock()
ridCount := len(ridMap)
return ridCount == 3
}
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
ridMapLock.Lock()
defer ridMapLock.Unlock()
ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
})
parameters := sender.GetParameters()
assert.Equal(t, "a", parameters.Encodings[0].RID)
assert.Equal(t, "b", parameters.Encodings[1].RID)
assert.Equal(t, "c", parameters.Encodings[2].RID)
var midID, ridID uint8
for _, extension := range parameters.HeaderExtensions {
switch extension.URI {
case sdp.SDESMidURI:
midID = uint8(extension.ID)
case sdp.SDESRTPStreamIDURI:
ridID = uint8(extension.ID)
}
}
assert.NotZero(t, midID)
assert.NotZero(t, ridID)
assert.NoError(t, signalPair(pcOffer, pcAnswer))
// padding only packets should not affect simulcast probe
var sequenceNumber uint16
for sequenceNumber = 0; sequenceNumber < simulcastProbeCount+10; sequenceNumber++ {
time.Sleep(20 * time.Millisecond)
for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
Padding: true,
},
Payload: []byte{0x00, 0x02},
}
assert.NoError(t, track.WriteRTP(pkt))
}
}
assert.False(t, ridsFullfilled(), "Simulcast probe should not be fulfilled by padding only packets")
for ; !ridsFullfilled(); sequenceNumber++ {
time.Sleep(20 * time.Millisecond)
for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
},
Payload: []byte{0x00},
}
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
assert.NoError(t, track.WriteRTP(pkt))
}
}
assertRidCorrect(t)
closePairNow(t, pcOffer, pcAnswer)
})
t.Run("RTCP", func(t *testing.T) {
pcOffer, pcAnswer, err := newPair()
assert.NoError(t, err)
vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]))
assert.NoError(t, err)
vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]))
assert.NoError(t, err)
vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[2]))
assert.NoError(t, err)
sender, err := pcOffer.AddTrack(vp8WriterA)
assert.NoError(t, err)
assert.NotNil(t, sender)
assert.NoError(t, sender.AddEncoding(vp8WriterB))
assert.NoError(t, sender.AddEncoding(vp8WriterC))
rtcpCounter := uint64(0)
pcAnswer.OnTrack(func(trackRemote *TrackRemote, receiver *RTPReceiver) {
_, _, simulcastReadErr := receiver.ReadSimulcastRTCP(trackRemote.RID())
assert.NoError(t, simulcastReadErr)
atomic.AddUint64(&rtcpCounter, 1)
})
var midID, ridID uint8
for _, extension := range sender.GetParameters().HeaderExtensions {
switch extension.URI {
case sdp.SDESMidURI:
midID = uint8(extension.ID)
case sdp.SDESRTPStreamIDURI:
ridID = uint8(extension.ID)
}
}
assert.NotZero(t, midID)
assert.NotZero(t, ridID)
assert.NoError(t, signalPair(pcOffer, pcAnswer))
for sequenceNumber := uint16(0); atomic.LoadUint64(&rtcpCounter) < 3; sequenceNumber++ {
time.Sleep(20 * time.Millisecond)
for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
},
Payload: []byte{0x00},
}
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
assert.NoError(t, track.WriteRTP(pkt))
}
}
closePairNow(t, pcOffer, pcAnswer)
})
}
type simulcastTestTrackLocal struct {
*TrackLocalStaticRTP
}
// don't use ssrc&payload in bindings to let the test write different stream packets.
func (s *simulcastTestTrackLocal) WriteRTP(pkt *rtp.Packet) error {
packet := getPacketAllocationFromPool()
defer resetPacketPoolAllocation(packet)
*packet = *pkt
s.mu.RLock()
defer s.mu.RUnlock()
writeErrs := []error{}
for _, b := range s.bindings {
if _, err := b.writeStream.WriteRTP(&packet.Header, packet.Payload); err != nil {
writeErrs = append(writeErrs, err)
}
}
return util.FlattenErrs(writeErrs)
}
func TestPeerConnection_Simulcast_RTX(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
rids := []string{"a", "b"}
pcOffer, pcAnswer, err := newPair()
assert.NoError(t, err)
vp8WriterAStatic, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]))
assert.NoError(t, err)
vp8WriterBStatic, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]))
assert.NoError(t, err)
vp8WriterA, vp8WriterB := &simulcastTestTrackLocal{vp8WriterAStatic}, &simulcastTestTrackLocal{vp8WriterBStatic}
sender, err := pcOffer.AddTrack(vp8WriterA)
assert.NoError(t, err)
assert.NotNil(t, sender)
assert.NoError(t, sender.AddEncoding(vp8WriterB))
var ridMapLock sync.RWMutex
ridMap := map[string]int{}
assertRidCorrect := func(t *testing.T) {
ridMapLock.Lock()
defer ridMapLock.Unlock()
for _, rid := range rids {
assert.Equal(t, ridMap[rid], 1)
}
assert.Equal(t, len(ridMap), 2)
}
ridsFullfilled := func() bool {
ridMapLock.Lock()
defer ridMapLock.Unlock()
ridCount := len(ridMap)
return ridCount == 2
}
var rtxPacketRead atomic.Int32
var wg sync.WaitGroup
wg.Add(2)
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
ridMapLock.Lock()
ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
ridMapLock.Unlock()
defer wg.Done()
for {
_, attr, rerr := trackRemote.ReadRTP()
if rerr != nil {
break
}
if pt, ok := attr.Get(AttributeRtxPayloadType).(byte); ok {
if pt == 97 {
rtxPacketRead.Add(1)
}
}
}
})
parameters := sender.GetParameters()
assert.Equal(t, "a", parameters.Encodings[0].RID)
assert.Equal(t, "b", parameters.Encodings[1].RID)
var midID, ridID, rsid uint8
for _, extension := range parameters.HeaderExtensions {
switch extension.URI {
case sdp.SDESMidURI:
midID = uint8(extension.ID)
case sdp.SDESRTPStreamIDURI:
ridID = uint8(extension.ID)
case sdesRepairRTPStreamIDURI:
rsid = uint8(extension.ID)
}
}
assert.NotZero(t, midID)
assert.NotZero(t, ridID)
assert.NotZero(t, rsid)
err = signalPairWithModification(pcOffer, pcAnswer, func(sdp string) string {
// Original chrome sdp contains no ssrc info https://pastebin.com/raw/JTjX6zg6
re := regexp.MustCompile("(?m)[\r\n]+^.*a=ssrc.*$")
res := re.ReplaceAllString(sdp, "")
return res
})
assert.NoError(t, err)
// padding only packets should not affect simulcast probe
var sequenceNumber uint16
for sequenceNumber = 0; sequenceNumber < simulcastProbeCount+10; sequenceNumber++ {
time.Sleep(20 * time.Millisecond)
for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
Padding: true,
SSRC: uint32(i),
},
Payload: []byte{0x00, 0x02},
}
assert.NoError(t, track.WriteRTP(pkt))
}
}
assert.False(t, ridsFullfilled(), "Simulcast probe should not be fulfilled by padding only packets")
for ; !ridsFullfilled(); sequenceNumber++ {
time.Sleep(20 * time.Millisecond)
for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
SSRC: uint32(i),
},
Payload: []byte{0x00},
}
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
assert.NoError(t, track.WriteRTP(pkt))
}
}
assertRidCorrect(t)
for i := 0; i < simulcastProbeCount+10; i++ {
sequenceNumber++
time.Sleep(10 * time.Millisecond)
for j, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 97,
SSRC: uint32(100 + j),
},
Payload: []byte{0x00, 0x00, 0x00, 0x00, 0x00},
}
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
assert.NoError(t, pkt.Header.SetExtension(rsid, []byte(track.RID())))
assert.NoError(t, track.WriteRTP(pkt))
}
}
for ; rtxPacketRead.Load() == 0; sequenceNumber++ {
time.Sleep(20 * time.Millisecond)
for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
SSRC: uint32(i),
},
Payload: []byte{0x00},
}
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
assert.NoError(t, track.WriteRTP(pkt))
}
}
closePairNow(t, pcOffer, pcAnswer)
wg.Wait()
assert.Greater(t, rtxPacketRead.Load(), int32(0), "no rtx packet read")
}
// Everytime we receive a new SSRC we probe it and try to determine the proper way to handle it.
// In most cases a Track explicitly declares a SSRC and a OnTrack is fired. In two cases we don't
// know the SSRC ahead of time
// * Undeclared SSRC in a single media section (https://github.com/pion/webrtc/issues/880)
// * Simulcast
//
// The Undeclared SSRC processing code would run before Simulcast. If a Simulcast Offer/Answer only
// contained one Media Section we would never fire the OnTrack. We would assume it was a failed
// Undeclared SSRC processing. This test asserts that we properly handled this.
func TestPeerConnection_Simulcast_NoDataChannel(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pcSender, pcReceiver, err := newPair()
assert.NoError(t, err)
var wg sync.WaitGroup
wg.Add(4)
var connectionWg sync.WaitGroup
connectionWg.Add(2)
connectionStateChangeHandler := func(state PeerConnectionState) {
if state == PeerConnectionStateConnected {
connectionWg.Done()
}
}
pcSender.OnConnectionStateChange(connectionStateChangeHandler)
pcReceiver.OnConnectionStateChange(connectionStateChangeHandler)
pcReceiver.OnTrack(func(*TrackRemote, *RTPReceiver) {
defer wg.Done()
})
go func() {
defer wg.Done()
vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("a"))
assert.NoError(t, err)
sender, err := pcSender.AddTrack(vp8WriterA)
assert.NoError(t, err)
assert.NotNil(t, sender)
vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("b"))
assert.NoError(t, err)
err = sender.AddEncoding(vp8WriterB)
assert.NoError(t, err)
vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("c"))
assert.NoError(t, err)
err = sender.AddEncoding(vp8WriterC)
assert.NoError(t, err)
parameters := sender.GetParameters()
var midID, ridID, rsidID uint8
for _, extension := range parameters.HeaderExtensions {
switch extension.URI {
case sdp.SDESMidURI:
midID = uint8(extension.ID)
case sdp.SDESRTPStreamIDURI:
ridID = uint8(extension.ID)
case sdesRepairRTPStreamIDURI:
rsidID = uint8(extension.ID)
}
}
assert.NotZero(t, midID)
assert.NotZero(t, ridID)
assert.NotZero(t, rsidID)
// signaling
offerSDP, err := pcSender.CreateOffer(nil)
assert.NoError(t, err)
err = pcSender.SetLocalDescription(offerSDP)
assert.NoError(t, err)
err = pcReceiver.SetRemoteDescription(offerSDP)
assert.NoError(t, err)
answerSDP, err := pcReceiver.CreateAnswer(nil)
assert.NoError(t, err)
answerGatheringComplete := GatheringCompletePromise(pcReceiver)
err = pcReceiver.SetLocalDescription(answerSDP)
assert.NoError(t, err)
<-answerGatheringComplete
assert.NoError(t, pcSender.SetRemoteDescription(*pcReceiver.LocalDescription()))
connectionWg.Wait()
var seqNo uint16
for i := 0; i < 100; i++ {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: seqNo,
PayloadType: 96,
},
Payload: []byte{0x00, 0x00},
}
assert.NoError(t, pkt.SetExtension(ridID, []byte("a")))
assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
assert.NoError(t, vp8WriterA.WriteRTP(pkt))
assert.NoError(t, pkt.SetExtension(ridID, []byte("b")))
assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
assert.NoError(t, vp8WriterB.WriteRTP(pkt))
assert.NoError(t, pkt.SetExtension(ridID, []byte("c")))
assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
assert.NoError(t, vp8WriterC.WriteRTP(pkt))
seqNo++
}
}()
wg.Wait()
closePairNow(t, pcSender, pcReceiver)
}
// Check that PayloadType of 0 is handled correctly. At one point
// we incorrectly assumed 0 meant an invalid stream and wouldn't update things
// properly
func TestPeerConnection_Zero_PayloadType(t *testing.T) {
lim := test.TimeOut(time.Second * 5)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pcOffer, pcAnswer, err := newPair()
require.NoError(t, err)
audioTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypePCMU}, "audio", "audio")
require.NoError(t, err)
_, err = pcOffer.AddTrack(audioTrack)
require.NoError(t, err)
assert.NoError(t, signalPair(pcOffer, pcAnswer))
onTrackFired, onTrackFiredCancel := context.WithCancel(context.Background())
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
require.Equal(t, track.Codec().MimeType, MimeTypePCMU)
onTrackFiredCancel()
})
go func() {
ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()
select {
case <-onTrackFired.Done():
return
case <-ticker.C:
if routineErr := audioTrack.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); routineErr != nil {
fmt.Println(routineErr)
}
}
}()
<-onTrackFired.Done()
closePairNow(t, pcOffer, pcAnswer)
}
1
https://gitee.com/mirrors/Pion-WebRTC.git
git@gitee.com:mirrors/Pion-WebRTC.git
mirrors
Pion-WebRTC
Pion-WebRTC
master

搜索帮助